---
title: "Middleware"
description: "Event stream transformation and filtering for AG-UI agents"
---

# Middleware

The middleware system in `@ag-ui/client` provides a powerful way to transform, filter, and augment event streams flowing through agents. Middleware can intercept and modify events, add logging, implement authentication, filter tool calls, and more.

```typescript
import { Middleware, MiddlewareFunction, FilterToolCallsMiddleware } from "@ag-ui/client"
```

## Types

### MiddlewareFunction

A function that transforms the event stream.

```typescript
type MiddlewareFunction = (
  input: RunAgentInput,
  next: AbstractAgent
) => Observable<BaseEvent>
```

### Middleware

Abstract base class for creating middleware.

```typescript
abstract class Middleware {
  abstract run(
    input: RunAgentInput,
    next: AbstractAgent
  ): Observable<BaseEvent>
}
```

## Function-Based Middleware

The simplest way to create middleware is with a function. Function middleware is ideal for stateless transformations.

### Basic Example

```typescript
const loggingMiddleware: MiddlewareFunction = (input, next) => {
  console.log(`[${new Date().toISOString()}] Starting run ${input.runId}`)

  return next.run(input).pipe(
    tap(event => console.log(`Event: ${event.type}`)),
    finalize(() => console.log(`Run ${input.runId} completed`))
  )
}

agent.use(loggingMiddleware)
```

### Transforming Events

```typescript
const prefixMiddleware: MiddlewareFunction = (input, next) => {
  return next.run(input).pipe(
    map(event => {
      if (event.type === EventType.TEXT_MESSAGE_CHUNK) {
        return {
          ...event,
          delta: `[Assistant]: ${event.delta}`
        }
      }
      return event
    })
  )
}
```

### Error Handling

```typescript
const errorMiddleware: MiddlewareFunction = (input, next) => {
  return next.run(input).pipe(
    catchError(error => {
      console.error("Agent error:", error)

      // Return error event
      return of({
        type: EventType.RUN_ERROR,
        message: error.message
      } as BaseEvent)
    })
  )
}
```

## Class-Based Middleware

For stateful operations or complex logic, extend the `Middleware` class.

### Basic Implementation

```typescript
class CounterMiddleware extends Middleware {
  private totalEvents = 0

  run(input: RunAgentInput, next: AbstractAgent): Observable<BaseEvent> {
    let runEvents = 0

    return next.run(input).pipe(
      tap(() => {
        runEvents++
        this.totalEvents++
      }),
      finalize(() => {
        console.log(`Run events: ${runEvents}, Total: ${this.totalEvents}`)
      })
    )
  }
}

agent.use(new CounterMiddleware())
```

### Configuration-Based Middleware

```typescript
class AuthMiddleware extends Middleware {
  constructor(
    private apiKey: string,
    private headerName: string = "Authorization"
  ) {
    super()
  }

  run(input: RunAgentInput, next: AbstractAgent): Observable<BaseEvent> {
    // Add authentication to context
    const authenticatedInput = {
      ...input,
      context: [
        ...input.context,
        {
          type: "auth",
          [this.headerName]: `Bearer ${this.apiKey}`
        }
      ]
    }

    return next.run(authenticatedInput)
  }
}

agent.use(new AuthMiddleware(process.env.API_KEY))
```

## Built-in Middleware

### FilterToolCallsMiddleware

Filters tool calls based on allowed or disallowed lists.

```typescript
import { FilterToolCallsMiddleware } from "@ag-ui/client"
```

#### Configuration

```typescript
type FilterToolCallsConfig =
  | { allowedToolCalls: string[]; disallowedToolCalls?: never }
  | { disallowedToolCalls: string[]; allowedToolCalls?: never }
```

#### Allow Specific Tools

```typescript
const allowFilter = new FilterToolCallsMiddleware({
  allowedToolCalls: ["search", "calculate", "summarize"]
})

agent.use(allowFilter)
```

#### Block Specific Tools

```typescript
const blockFilter = new FilterToolCallsMiddleware({
  disallowedToolCalls: ["delete", "modify", "execute"]
})

agent.use(blockFilter)
```

## Middleware Patterns

### Timing Middleware

```typescript
const timingMiddleware: MiddlewareFunction = (input, next) => {
  const startTime = performance.now()

  return next.run(input).pipe(
    finalize(() => {
      const duration = performance.now() - startTime
      console.log(`Execution time: ${duration.toFixed(2)}ms`)
    })
  )
}
```

### Rate Limiting

```typescript
class RateLimitMiddleware extends Middleware {
  private lastCall = 0

  constructor(private minInterval: number) {
    super()
  }

  run(input: RunAgentInput, next: AbstractAgent): Observable<BaseEvent> {
    const now = Date.now()
    const elapsed = now - this.lastCall

    if (elapsed < this.minInterval) {
      // Delay the execution
      return timer(this.minInterval - elapsed).pipe(
        switchMap(() => {
          this.lastCall = Date.now()
          return next.run(input)
        })
      )
    }

    this.lastCall = now
    return next.run(input)
  }
}

// Limit to one request per second
agent.use(new RateLimitMiddleware(1000))
```

### Retry Logic

```typescript
const retryMiddleware: MiddlewareFunction = (input, next) => {
  return next.run(input).pipe(
    retry({
      count: 3,
      delay: (error, retryCount) => {
        console.log(`Retry attempt ${retryCount}`)
        return timer(1000 * retryCount) // Exponential backoff
      }
    })
  )
}
```

### Caching

```typescript
class CacheMiddleware extends Middleware {
  private cache = new Map<string, BaseEvent[]>()

  run(input: RunAgentInput, next: AbstractAgent): Observable<BaseEvent> {
    const cacheKey = this.getCacheKey(input)

    if (this.cache.has(cacheKey)) {
      console.log("Cache hit")
      return from(this.cache.get(cacheKey)!)
    }

    const events: BaseEvent[] = []

    return next.run(input).pipe(
      tap(event => events.push(event)),
      finalize(() => {
        this.cache.set(cacheKey, events)
      })
    )
  }

  private getCacheKey(input: RunAgentInput): string {
    // Create a cache key from the input
    return JSON.stringify({
      messages: input.messages,
      tools: input.tools.map(t => t.name)
    })
  }
}
```

## Chaining Middleware

Multiple middleware can be combined to create sophisticated processing pipelines.

```typescript
// Create middleware instances
const logger = loggingMiddleware
const auth = new AuthMiddleware(apiKey)
const filter = new FilterToolCallsMiddleware({
  allowedToolCalls: ["search"]
})
const rateLimit = new RateLimitMiddleware(1000)

// Apply middleware in order
agent.use(
  logger,      // First: Log all events
  auth,        // Second: Add authentication
  rateLimit,   // Third: Apply rate limiting
  filter       // Fourth: Filter tool calls
)

// Execution flow:
// logger → auth → rateLimit → filter → agent → filter → rateLimit → auth → logger
```

## Advanced Usage

### Conditional Middleware

```typescript
const debugMiddleware: MiddlewareFunction = (input, next) => {
  const isDebug = input.context.some(c => c.type === "debug")

  if (!isDebug) {
    return next.run(input)
  }

  return next.run(input).pipe(
    tap(event => {
      console.debug("[DEBUG]", JSON.stringify(event, null, 2))
    })
  )
}
```

### Event Filtering

```typescript
const filterEventsMiddleware: MiddlewareFunction = (input, next) => {
  return next.run(input).pipe(
    filter(event => {
      // Only allow specific event types
      return [
        EventType.RUN_STARTED,
        EventType.TEXT_MESSAGE_CHUNK,
        EventType.RUN_FINISHED
      ].includes(event.type)
    })
  )
}
```

### Stream Manipulation

```typescript
const bufferMiddleware: MiddlewareFunction = (input, next) => {
  return next.run(input).pipe(
    // Buffer text chunks and emit them in batches
    bufferWhen(() =>
      interval(100).pipe(
        filter(() => true)
      )
    ),
    map(events => events.flat())
  )
}
```

## Best Practices

1. **Single Responsibility**: Each middleware should focus on one concern
2. **Error Handling**: Always handle errors gracefully and consider recovery strategies
3. **Performance**: Be mindful of processing overhead in high-throughput scenarios
4. **State Management**: Use class-based middleware when state is required
5. **Testing**: Write unit tests for each middleware independently
6. **Documentation**: Document middleware behavior and side effects

## TypeScript Support

The middleware system is fully typed for excellent IDE support:

```typescript
import {
  Middleware,
  MiddlewareFunction,
  FilterToolCallsMiddleware
} from "@ag-ui/client"
import { RunAgentInput, BaseEvent, EventType } from "@ag-ui/core"

// Type-safe middleware function
const typedMiddleware: MiddlewareFunction = (
  input: RunAgentInput,
  next: AbstractAgent
): Observable<BaseEvent> => {
  return next.run(input)
}

// Type-safe middleware class
class TypedMiddleware extends Middleware {
  run(
    input: RunAgentInput,
    next: AbstractAgent
  ): Observable<BaseEvent> {
    return next.run(input)
  }
}
```